In signal processing, data compression, source coding, or bit-rate reduction involves encoding information using fewer bits than the original representation. Compression can be either lossy or lossless. Lossless compression reduces bits by identifying and eliminating statistical redundancy. No information is lost in lossless compression. Lossy compression reduces bits by removing unnecessary or less important information. [1]
Entropy is a measure of unpredictability, so its value is inversely related to the compression capacity of a chain of symbols (e.g. a file). [2]
# Load Python libraries
import numpy as np
import math
import timeit
from collections import Counter
from scipy.stats import entropy
# Load Plot libraries
import seaborn as sns
import matplotlib.pyplot as plt
# Class HuffmanCode from scratch
class HuffmanCode:
# Return a Huffman code for an ensemble with distribution p
def get_code(self, p_symbols):
# Init validation
n = len(p_symbols)
if n == 0:
return dict()
elif n == 1:
return dict(zip(p_symbols.keys(), ['1']))
# Ensure probabilities sum to 1
self._normalize_weights(p_symbols)
# Returns Huffman code
return self._get_code(p_symbols);
# (Private) Calculate Huffman code
def _get_code(self, p):
# Base case of only two symbols, assign 0 or 1 arbitrarily
if len(p) == 2:
return dict(zip(p.keys(), ['0', '1']))
# Create a new distribution by merging lowest prob pair
p_prime = p.copy()
s1, s2 = self._get_lowest_prob_pair(p)
p1, p2 = p_prime.pop(s1), p_prime.pop(s2)
p_prime[s1 + s2] = p1 + p2
# Recurse and construct code on new distribution
code = self._get_code(p_prime)
symbol = s1 + s2
s1s2 = code.pop(symbol)
code[s1], code[s2] = s1s2 + '0', s1s2 + '1'
return code;
# Return pair of symbols from distribution p with lowest probabilities
def _get_lowest_prob_pair(self, p):
# Ensure there are at least 2 symbols in the dist.
if len(p) >= 2:
sorted_p = sorted(p.items(), key=lambda x: x[1])
return sorted_p[0][0], sorted_p[1][0];
return (None, None);
# Makes sure all weights add up to 1
def _normalize_weights(self, p_symbols, t_weight=1.0):
n = sum(p_symbols.values())
if n != t_weight:
for s in p_symbols:
p_symbols[s] = p_symbols[s] / n;
# Create Huffman Code instance
hc = HuffmanCode()
# Function - Read file in low level (Bytes)
def get_file_bytes(file_path):
with open(file_path, 'rb') as f:
return bytearray(f.read());
return None;
# Function - Return shannon entropy
def entropy_shannon(labels, base=None):
value, counts = np.unique(labels, return_counts=True)
return entropy(counts, base=base)
# Loading target file
file_path = "../data/text/book-1.txt"
text_byte_list = get_file_bytes(file_path)
print(len(text_byte_list), 'Bytes')
# Function - Calculates the compression percentage (%)
def calc_compression_percentage(curr_size, new_size, precision = 14):
return round((curr_size - new_size) / curr_size * 100, precision);
# Calculate entropy of file
actual_entropy = entropy_shannon(text_byte_list, 2)
actual_entropy
# Real compression percentage (%)
max_entropy = 8
compress_rate = calc_compression_percentage(max_entropy, actual_entropy)
print(compress_rate, '%')
# Function - Calculate code frequency
def get_term_freq(term_list):
term_freq = {}
terms_count = dict(Counter(term_list))
for key, value in terms_count.items():
if isinstance(key, int):
key = chr(key)
term_freq[key] = value
return term_freq;
# Function - Build the compress file
def create_compress_file(byte_list, code_list):
compress_list = []
for symbol in byte_list:
key = chr(symbol)
new_symbol = code_list[key]
compress_list.append(new_symbol)
# Return compress file
return "".join(compress_list)
# Function - Compressing file
def get_compress_file(byte_list):
# Get symbols frequency
term_freq = get_term_freq(byte_list)
# Normalize term frequency
n = sum(term_freq.values())
for term in term_freq:
term_freq[term] = term_freq[term] / n;
# Get Huffman coding
h_code = hc.get_code(term_freq)
# Compressing file with Huffman code
compress_file = create_compress_file(byte_list, h_code)
return compress_file, h_code;
# Compressing initial text file
compress_file, h_code = get_compress_file(text_byte_list)
# Real compression percentage (%)
curr_size = len(text_byte_list)
new_size = len(compress_file) / 8
compress_rate = calc_compression_percentage(curr_size, new_size)
print(compress_rate, '%')
The assumption is that by lowering (changing) the entropy of a file, it can be compressed more, than with its current entropy.
A greedy approach with lossless compression reduces bits will be used. Below, the functions that allow you to find the best entropy configuration for a binary file.
# Function - Find the low entropy setup for target file
def find_low_entropy(byte_list, neighborhood, verbose):
start_time = timeit.default_timer()
B = 256
best_entropy = math.log(B, 2)
best_byte_list = []
best_key = -1
n = len(byte_list)
if verbose:
print('file size:', n, 'bytes')
print('initial entropy:', best_entropy)
print('neighborhood:', neighborhood)
for curr_key in range(B):
curr_byte_list = []
for i in range(n):
curr_value = byte_list[i] ^ curr_key
for j in range(i - neighborhood, i + neighborhood + 1):
if j < i and j >= 0:
# Backward
curr_value = curr_value ^ curr_byte_list[j]
elif j > i and j < n:
# Forward
curr_value = curr_value ^ byte_list[j]
curr_byte_list.append(curr_value)
# Get current entropy
curr_entropy = entropy_shannon(curr_byte_list.copy(), 2)
# Update best option
if curr_entropy < best_entropy:
if verbose:
print(' key:', curr_key, ', new entropy:', curr_entropy, ', diff:', (curr_entropy - best_entropy))
best_entropy = curr_entropy
best_key = curr_key
best_byte_list = curr_byte_list
# Elapsed time
elapsed = timeit.default_timer() - start_time
if verbose:
print('elapsed time', elapsed, 'ms')
return { 'byte_list': best_byte_list, 'entropy': best_entropy, 'key': best_key};
Now, we try to find the best entropy setting with a neighborhood of 1.
# Find file with low entropy
neighborhood = 2
best_option = find_low_entropy(text_byte_list, neighborhood, True)
# Show best entropy setup
best_option['entropy'], best_option['key']
# Save best byte list
best_byte_list = best_option['byte_list']
# Function - Create byte matrix
def create_byte_matrix(byte_list, row_len, col_len):
matrix = np.zeros((row_len, col_len))
data = np.array(byte_list)
# Save bytes into matrix
for i in range(0, len(data)):
ix_row = int(i / col_len)
ix_col = i % col_len
matrix[ix_row][ix_col] = data[i]
return matrix;
# Function - Plot image in binary file
def plot_byte_matrix(matrix, plt_title):
fig, ax = plt.subplots(figsize = (14, 14))
sns.heatmap(matrix, ax = ax)
ax.set_title(plt_title, fontsize = 16)
ax.set_xlabel('columns', fontsize = 12)
ax.set_ylabel('rows', fontsize = 12)
plt.show()
# Create and plot original matrix
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(text_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
# Create and plot the new matrix (with low entropy)
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(best_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of New File')
# Save the new file as txt
temp_file_path = file_path.replace('.txt', '-new.txt')
with open(temp_file_path, 'w+b') as f:
binary_format = bytearray(best_byte_list)
f.write(binary_format)
# Calculating new theoretical compression percentage
compress_rate = calc_compression_percentage(actual_entropy, best_option['entropy'])
print(compress_rate, '%')
Below, the functions that allow you to reverse the entropy change.
# Restoring entropy process
def restore_entropy(byte_list, neighborhood, key):
new_byte_list = []
n = len(byte_list)
for i in range(n - 1, -1, -1):
curr_value = byte_list[i] ^ key
for j in range(i - neighborhood, i + neighborhood + 1):
if j < i and j >= 0:
# Backward
curr_value = curr_value ^ byte_list[j]
elif j > i and j < n:
# Forward
curr_value = curr_value ^ new_byte_list[n - j - 1]
new_byte_list.append(curr_value)
# Return reservsed list
return list(reversed(new_byte_list))
# Call the entropy restoration function
new_byte_list = restore_entropy(best_byte_list, neighborhood, best_option['key'])
# Comparing size of byte lists
len(new_byte_list) == len(text_byte_list)
# Comparing values of byte lists
sum(new_byte_list) - sum(text_byte_list)
# Comparing entropies of byte lists
entropy_shannon(text_byte_list, 2) == entropy_shannon(new_byte_list, 2)
# Create and plot the new matrix (with low entropy)
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(new_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
# Save the original file as txt
temp_file_path = file_path.replace('.txt', '-original.txt')
with open(temp_file_path, 'w+b') as f:
binary_format = bytearray(new_byte_list)
f.write(binary_format)